/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.fone.flumeExt.interceptor;

import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.DEFAULT_OUTCOME;
import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.OUTCOME;
import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.DEFAULT_REGEX;
import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.DEFAULT_LIMITS;
import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.LIMITS;
import static com.fone.flumeExt.interceptor.RegexReplaceAllInterceptor.Constants.REGEX;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;

// TODO: Auto-generated Javadoc
/**
 * The Class RegexReplaceAllInterceptor.
 *
 * @author Phoenics Chow
 */
public class RegexReplaceAllInterceptor implements Interceptor {

	/** The Constant logger. */
	private static final Logger logger = LoggerFactory.getLogger(RegexReplaceAllInterceptor.class);

	/** The regex. */
	private final Pattern regex;

	/** The outcome. */
	private final String outcome;

	/** The limits. */
	private final String limits;

	/*
	 * 
	 * \t 制表符 ('\u0009') \n 新行（换行）符 (' ') \r 回车符 (' ') \f 换页符 ('\u000C') \a 报警
	 * (bell) 符 ('\u0007') \e 转义符 ('\u001B') \cx 对应于 x 的控制符
	 */

	/**
	 * Only {@link RegexReplaceAllInterceptor.Builder} can build me
	 *
	 * @param regex
	 *            the regex
	 * @param limits
	 *            the limits
	 * @param outcome
	 *            the outcome
	 */
	private RegexReplaceAllInterceptor(Pattern regex, String limits, String outcome) {
		String t = outcome.trim();
		if (!t.equals("")) {
			t = UnicodeToString(t);
		}
		this.regex = regex;
		this.outcome = t;

		this.limits = limits.toLowerCase().trim();
	}

	/**
	 * Unicode to string.
	 *
	 * @param str
	 *            the str
	 * @return the string
	 */
	public static String UnicodeToString(String str) {
		Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
		Matcher matcher = pattern.matcher(str);
		char ch;
		while (matcher.find()) {
			ch = (char) Integer.parseInt(matcher.group(2), 16);
			str = str.replace(matcher.group(1), ch + "");
		}
		return str;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.flume.interceptor.Interceptor#initialize()
	 */
	@Override
	public void initialize() {
		// no-op
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see
	 * org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event
	 * )
	 */
	@Override
	/**
	 * Returns the event if it passes the regular expression filter and null
	 * otherwise.
	 */
	public Event intercept(Event event) {
		// We've already ensured here that at most one of includeRegex and
		// excludeRegex are defined.
		if (event == null) {
			return null;
		}
		try {
			String e_b = new String(event.getBody(), Charsets.UTF_8);
			Matcher matcher = regex.matcher(e_b);
			boolean r_f = matcher.find();
			if (!r_f) {
				return event;
			}
			String eventb = e_b;
			if (limits.equals("all")) {
				eventb = matcher.replaceAll(outcome);
			} else if (limits.equals("first")) {
				eventb = matcher.replaceFirst(outcome);
			} else {
				eventb = e_b;
			}
			event.setBody(eventb.getBytes(Charsets.UTF_8));
			return event;
		} catch (Exception e) {
			return event;
		}
	}

	/**
	 * Returns the set of events which pass filters, according to
	 * {@link #intercept(Event)}.
	 *
	 * @param events
	 *            the events
	 * @return the list
	 */
	@Override
	public List<Event> intercept(List<Event> events) {
		List<Event> out = Lists.newArrayList();
		for (Event event : events) {
			Event outEvent = intercept(event);
			if (outEvent != null) {
				out.add(outEvent);
			}
		}
		return out;
	}

	/*
	 * (non-Javadoc)
	 * 
	 * @see org.apache.flume.interceptor.Interceptor#close()
	 */
	@Override
	public void close() {
		// no-op
	}

	/**
	 * Builder which builds new instance of the StaticInterceptor.
	 *
	 * @author Phoenics Chow
	 */
	public static class Builder implements Interceptor.Builder {

		/** The regex. */
		private Pattern regex;

		/** The outcome. */
		private String outcome;

		/** The limits. */
		private String limits;

		/*
		 * (non-Javadoc)
		 * 
		 * @see
		 * org.apache.flume.conf.Configurable#configure(org.apache.flume.Context
		 * )
		 */
		@Override
		public void configure(Context context) {
			// excludeEvents=false;
			String regexString = context.getString(REGEX, DEFAULT_REGEX);
			regex = Pattern.compile(regexString);
			outcome = context.getString(OUTCOME, DEFAULT_OUTCOME);
			limits = context.getString(LIMITS, DEFAULT_LIMITS);
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.apache.flume.interceptor.Interceptor.Builder#build()
		 */
		@Override
		public Interceptor build() {
			logger.info(String.format("Creating RegexReplaceAllInterceptor: regex=%s,limits=%s,outcome=%s", regex, limits, outcome));
			return new RegexReplaceAllInterceptor(regex, limits, outcome);
		}
	}

	/**
	 * The Class Constants.
	 *
	 * @author Phoenics Chow
	 */
	public static class Constants {

		/** The Constant REGEX. */
		public static final String REGEX = "regex";

		/** The Constant DEFAULT_REGEX. */
		public static final String DEFAULT_REGEX = "";

		/** The Constant OUTCOME. */
		public static final String OUTCOME = "outcome";

		/** The Constant DEFAULT_OUTCOME. */
		public static final String DEFAULT_OUTCOME = "";

		/** The Constant DEFAULT_LIMITS. */
		public static final String DEFAULT_LIMITS = "all";// 这里只有两个值,all 和 first

		/** The Constant LIMITS. */
		public static final String LIMITS = "limits";// 这里只有两个值,all 和 first
	}
}
